package com.markhsiu.minimq.remote.transport.netty;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.markhsiu.minimq.client.AbstractClient;
import com.markhsiu.minimq.client.CallBackCache;
import com.markhsiu.minimq.core.constant.ConstantUtil;
import com.markhsiu.minimq.core.exeption.MiniMQException;
import com.markhsiu.minimq.core.thread.ThreadFactoryBuilder;
import com.markhsiu.minimq.message.Message;
import com.markhsiu.minimq.message.Result;
import com.markhsiu.minimq.remote.Address;
import com.markhsiu.minimq.remote.transport.netty.handler.NettyClientHandler;
import com.markhsiu.minimq.serialize.protostuff.ProtostuffCodec;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;

/**
 *  netty通信客户端
 * Created by Mark Hsiu on 2017/2/8.
 */
public class NettyClient extends AbstractClient {
	 
	
	private int THREADS = ConstantUtil.getWorkerThreads();
	private static final ThreadFactory workerThreadFactory 
			= new ThreadFactoryBuilder().setNameFormat("client-default").build();
	private EventLoopGroup worker = new NioEventLoopGroup(1);
	private DefaultEventExecutorGroup defaultEventExecutorGroup 
		= new DefaultEventExecutorGroup(THREADS, workerThreadFactory);
    
	private Bootstrap bootstrap;
	private Channel messageChannel;

	public NettyClient() {	
		super(null);
	}

	public NettyClient(Address serverURL) {
		super(serverURL);
	}

	 
	@Override
	public Result send(Message message) {
		final Result result = new Result();
		String messageID = message.getMessageID();
		
		try {
			CallBackCache.putCallBack(messageID, result);
			ChannelFuture channel = messageChannel.writeAndFlush(message);
			channel.addListener(new ChannelFutureListener() {
				public void operationComplete(ChannelFuture future) throws Exception {
					if (!future.isSuccess()) {
						logger.error(future.cause().toString());
						result.setError(1);
					}
				}
			});

			Message ask = result.getMessageResult(10, TimeUnit.SECONDS);
			if (ask == null) {
				if (result.getError() == 1) {
					throw new MiniMQException("响应超时");
				} else {
					logger.error("结果为null");
				}
			}
			result.setMessage(ask);
		} finally {
			CallBackCache.removeCallBack(messageID);
		}
		
		
		return result;
	}


	@Override
	public void init() {
		bootstrap = new Bootstrap();
		bootstrap.group(worker)
			.channel(NioSocketChannel.class)
			.option(ChannelOption.TCP_NODELAY, true)
			.option(ChannelOption.SO_KEEPALIVE, false)
			.option(ChannelOption.SO_SNDBUF, ConstantUtil.socketSndBufSize)
            .option(ChannelOption.SO_RCVBUF, ConstantUtil.socketRcvBufSize)
			.handler(new ChilderChannelHandler());

	}

	@Override
	public void connect() {
		
		if (bootstrap == null) {
			throw new MiniMQException(" you need open Client before connect");
		}

		try {
			ChannelFuture future = bootstrap.connect(addr.getHostname(), addr.getPort()).sync();
			active = true;
			future.addListener(new ChannelFutureListener() {
				public void operationComplete(ChannelFuture future) throws Exception {
					Channel channel = future.channel();
					messageChannel = channel;
				}
			});

		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	

	@Override
	public void close() {
		try {
			if (messageChannel != null) {
				messageChannel.close().sync();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			worker.shutdownGracefully();
			active = false;
		}

	}

	private class ChilderChannelHandler extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(SocketChannel channel) throws Exception {
			ChannelPipeline pip = channel.pipeline();
			// 1. 禁止对类加载器进行缓存，它在基于OSGI的动态模块化编程中经常使用
//			pip.addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
//			pip.addLast(new ObjectEncoder());
			pip.addLast(defaultEventExecutorGroup);
			pip.addLast(new MessageObjectEncoder(ProtostuffCodec.OneInstance()));
			pip.addLast(new MessageObjectDecoder(ProtostuffCodec.OneInstance()));
			pip.addLast("timeout",new IdleStateHandler(0, 0, 120));
			pip.addLast(new NettyClientHandler(callBackor));
		}
	}

	

}
